并发
并发的优势是?
- 执行速度极大提高
- 为设计类型的程序提供更加易用的模型
并发的问题
当并发执行的任务开始产生交互的时候,一些不可预料的问题就会发生。因此,伴随着并发好处的同时会有一大堆的问题产生。
并发的多面性
什么叫做多面性?
需要并发处理的情况有很多,实现并发的方式也有很多,但是他们并不是一一对应的关系,也就是说,没有一个确切的解决方案,需要随机应变。
并发解决的问题大致可以分为两类:“速度”和“设计可管理性”
速度
如何解决了速度的问题?
对于多处理器,把任务分段在多个处理器上同时执行,很明显是变快了,这很好理解。
但是,并发更多的是处理单处理器上的速度问题,这似乎有点违背常理:在单处理器上,并发处理需要上下文切换,似乎开销比顺序执行的还要大。
让并发存在单处理上变得不可缺少的一个原因是“阻塞”。
什么是阻塞?
当程序因为某个控制范围之外(通常是I/O)的条件而停止了,整个程序都要暂停,也就是阻塞。
如果有并发,我们可以说只是一个任务暂停了,其它的任务还可以执行。
如何处理阻塞问题?
无并发的处理方案:在代码中利用循环周期性的检查阻塞的状态。问题是:
- 代码会非常丑陋
- 无法确保程序员不忘记这种检查
使用进程实现并发:进程之间的任务互不干涉,若某个进程被阻塞了,执行下一个任务,这样的并发程序是没有风险的。
进程实现并发的问题?
进程有数量和开销的限制。
其它的并发实现?
- 使用并发任务彼此隔离的语言(譬如Erlang):这类语言和进程并发类似,而且减少了进程的限制。
- 在顺序性语言(JAVA)上提供了对线程的支持
改进代码设计
如何解决了代码设计管理的问题?
比如仿真设计,在游戏中,每一个角色看起来都是独立的,互相执行的不同的任务,这就需要多线程的设计方式了,使用单线程很难处理这个问题。
JAVA 线程基本使用
- 先定义任务,再把任务赋予到一个线程上驱动,案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {
}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
}
public void run() {
while (countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
public class ThreadTest {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new LiftOff()).start();
}
System.out.println("Waiting for LiftOff");
}
}
每一次执行的结果都不一样。
Thread.yield()是让线程调度器重新分配任务。
- 继承Thread,定义一个具有特定任务的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0; 814
Thinking in
JavaBruce Eckel
public SimpleThread() { // Store the thread name
super(Integer.toString(++threadCount));
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while (true) {
System.out.print(this);
if (--countDown == 0) return;
}
}
}
在构造器中启动线程并不是一个很好的选择。
需要理解的一个思想是:
Runnable存在的意义更多的是一个“任务拥有者”的抽象,里面的run()表示一个“可执行的任务”
Thread存在的意义才贴近系统上的线程,可以看做是一个“任务执行者”,具有开始执行任务的方法start()。
Thread实现Runnable表示Thread也是“任务拥有者”,Thread(Runnable)的含义是用一个新的任务去覆盖Thread本身的默认任务。
Thread中持有target来保存任务拥有者,如果任务拥有者不为空,会调用任务拥有者的方法。
把任务和线程的概念抽离出来,可以更好的处理任务的创建,和对线程运行的理解。
上面的实现方法有什么问题?
- 每一个任务都需要创建一个线程,在物理上创建线程的代价是比较大的,因此我们必须管理好线程。
- 没有返回
如何管理线程?
使用JAVA5的Executor
Executor的实现使用了命令模式,把所有的Runable作为一个任务,采用不同的策略来分配线程执行。
也使用了享元模式,用来操作线程的缓存。
通过查看Thread中的类,可以发现,Thread并没有修改target的方法,那么,线程如何重复利用?
实际上,虽然任务拥有者无法被改变,但是任务拥有者拥有的任务是可以被改变的。Executor中的Worker类就是一类可以把其它Runnable中的任务,接手过来,以此实现线程的重复利用的。
什么是命令模式?
- Command:定义命令的接口,声明执行的方法。
- ConcreteCommand:命令接口实现对象,是“虚”的实现;通常会持有接收者,并调用接收者的功能来完成命令要执行的操作。
- Receiver:接收者,真正执行命令的对象。任何类都可能成为一个接收者,只要它能够实现命令要求实现的相应功能。
- Invoker:要求命令对象执行请求,通常会持有命令对象,可以持有很多的命令对象。这个是客户端真正触发命令并要求命令执行相应操作的地方,也就是说相当于使用命令对象的入口。
- Client:创建具体的命令对象,并且设置命令对象的接收者。注意这个不是我们常规意义上的客户端,而是在组装命令对象和接收者,或许,把这个Client称为装配者会更好理解,因为真正使用命令的客户端是从Invoker来触发执行。
下面的链接有详细的介绍:
[命令模式]https://my.oschina.net/xianggao/blog/618809
我的理解是,把请求抽象成命令,然后可以管理这些命令:
- 可以把各种请求抽象成命令对象来处理。
- 可撤销,让所有的命令都支持可撤销的方法。
- 宏命令,一个命令的集合,批量执行。
- 命令队列,采取多线程的方式来执行命令。
Executor的命令模式?
Runable类就是命令的抽象
Executor是Invoker,执行对命令的管理和执行。
常见的3种Executor子类有:
- CachedThreadPool:这个实例会根据需要,在线程可用时,重用之前构造好的池中线程。如果不存在可用线程,那么会重新创建一个新的线程并将其加入到线程池中。如果线程超过60s(可设置)还未被使用,就会被中止并从缓存中移除。因此,线程池在长时间空闲后不会消耗任何资源。
- FixedThreadPool:这个实例会复用固定数量的线程处理一个共享的无边界队列。有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行ExecutorService.shutdown() 关闭。
- 这个实例只会使用单个工作线程来执行一个无边界的队列。它可以保证认为是按顺序执行的,任何时候都不会有多于一个的任务处于活动状态。
如何解决返回值的问题?
实现带有泛型的Callable接口而不是Runnable接口,实现call()方法。
配合Exceutor使用的话,必须使用ExecutorService.submit()方法调用。
案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i)));
for (Future<String> fs : results)
try { // get() blocks until completion:
System.out.println(fs.get());
} catch (InterruptedException e) {
System.out.println(e);
return;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
因为我们不知道线程在什么时候调用完成,所以返回值被Future封装了,使用isDone()查询Future是否已经完成,完成后可以使用get()方法获取返回值。
之前我们可以看到,线程的执行完全是随机的,和底层的实现机制有关,如何控制这一执行的顺序?
如何控制线程执行的顺序?
设置线程优先级,Thread.currentThread().setPriority(priority);
使用案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // No optimization private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
while (true) { // An expensive, interruptable operation:
for (int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double) i;
if (i % 1000 == 0) Thread.yield();
}
System.out.println(this);
if (--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
注意,优先级的设置最好放到run()方法中,因此可以保证当前任务已经开始执行。
优先级的作用:中断开销比较大的低优先级线程,让高优先级线程先执行。并不是说高优先级的线程就一定先执行。
后台线程
什么是后台线程?
指在程序运行的时候再后台提供一种通用的服务,并且这种线程并不属于程序中不可或缺的部分,一旦非后台线程结束了,所有的后台程序也就结束了。
使用方法:1
2
3Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true); // Must call before start()
daemon.start();
线程启动前设置
后台线程中创建的线程都是后台线程。
后台线程中的finally{}语句有可能不会被执行。
对于Executor管理的线程,如何设置?
每一个静态的ExecutorService都可以接受一个ThreadFactory的工厂1
2
3
4
5
6
7public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
对于线程的基本介绍到这,现在来解决第一个问题:如何解决阻塞问题?
先看一个阻塞问题:1
2
3
4
5
6
7
8class UnresponsiveUI {
private volatile double d = 1;
public UnresponsiveUI() throws Exception {
while(d > 0)
d = d + (Math.PI + Math.E) / d;
System.in.read(); // Never gets here
}
}
使用多线程解决方案:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class ResponsiveUI extends Thread {
private static volatile Long d = 1L;
public ResponsiveUI() {
setDaemon(true);
start();
}
public void run() {
while (true) {
d++;
}
}
public static void main(String[] args) throws Exception {
//! new UnresponsiveUI(); // Must kill this process
new ResponsiveUI();
System.in.read();
// Shows progress
System.out.println(d);
}
}
线程的异常
每一个线程的异常都需要在run()方法中处理,一旦异常被抛出到线程外,都会打印到控制台。
案例:1
2
3
4
5
6
7
8
9public class ExceptionThread implements Runnable {
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
}
结果是控制台打印出异常,即便是runtime异常,即便在main()中加上try-catch语句也没用。
可是有时候如果不希望某个线程的异常影响到其它线程的执行,我也不想在run()做一些没必要的异常处理,怎么办?
JAVA5增加了Thread.UncaughtExceptionHandler接口,可以使用Thread.setUncaughtExceptionHandler()的来设置对特定线程死亡时未处理的异常的处理。
也可以使用setDefaultUncaughtExceptionHandler()来设置默认的处理器。
1 | class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { |
配合Exceutor使用的话就是在ThreadFactory的实现中调用Thread.setUncaughtExceptionHandler()方法。
共享受限的资源
线程共享资源的问题?
一个线程访问到了另一个线程中尚未处理完成的中间变量结果,导致逻辑上出现错误。
主导原因是由于 两个线程的代码发生交叉执行。
访问到中间状态值才会出现错误,访问到结果状态值不算错误。
如何解决
思路是防止访问到其它线程中的共享资源的中间状态值。
使用同步机制:
- 互斥机制:synchronized,lock
- 原子操作
字段设为private的好处是?
只允许在方法中去访问字段,然后给方法加锁来控制对资源的访问。
方法前加synchronized是给什么加锁?
是给当前的对象或者是类,这点也就意味着一个对象中有多个synchronized方法被调用,即便不是同一个方法,只一个方法调用结束了,另一个方法才能被调用。
被锁上的“对象”都会有一个对应的计数器,只有当锁的计数器为0的时候,某资源才会被访问。譬如上述中一个对象中有3个synchronized方法被调用,当前的计数器就是3。
上面的锁机制称为互斥机制:一段时间只有一个任务可以运行这段代码。
使用synchronized和使用lock的区别?
- lock具有更细粒度,可以处理关键处的代码,没有必要锁上整个代码。
- lock可以处理异常,synchronized报错了只能返回错误
- lock具有更多的锁控制方法,比如tryLock()可以设置获取锁和等待锁的时间。
什么时候需要进行同步处理?
当一个线程需要读取其它线程中修改的变量的时候。所有读取改变量的方法都要加同步处理。
这里必须强调所有,一些加一些不加,也无法产生正常的结果。
什么是原子操作和原子性?
一个不可被线程调度中断的操作被称为原子操作。这种不可分割的特性叫做原子性。
理论上说,如果一个线程只执行一个原子操作,就不会有不稳定的中间态,是不是就没有资源共享的问题了?
理解上是正确的,但是使用起来非常危险,会有以下的几个问题:
- 如何确实是一个原子操作?
- 多核处理器的情况下还会有可见性的问题。
- 使用synchronized保证的原子性只对synchronized方法有效
使用synchronized可以保证原子性
什么属于原子操作?
基本类型都具有原子性(除long和double之外),对其的读取和赋值操作,返回操作是原子操作。
对于long和double(64字节),jvm可能会分为两个32字节的指令来处理,这时候也就不是原子操作了。
队员这种基本数据类型的简单操作,可以使用volatile来保证原子性。
什么是可见性问题?
在多核处理器中,每一个处理器刻都会有一个本地缓存,导致数据没有写入到内存中,也就无法被其它线程读取,这就是可见性的问题。
可以使用volatile来保证字段的可见性。
使用synchronized可以保证可见性
如何理解使用synchronized保证的原子性只对synchronized方法有效?
案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue() {
return i;
}
private synchronized void evenIncrement() {
i++;
i++;
}
public void run() {
while (true) evenIncrement();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while (true) {
int val = at.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
上面的结果是会出现奇数值,原因是因为synchronized只保证和synchronized操作之间的原子性。
理论上说,使用原子操作确实可以代替锁来实现同步,但是由于各种各样的原因,使用原子操作是一个非常危险的尝试,如果不是专家,请不要轻易尝试。
如果情非得已需要使用原子操作,JAVA5提供了原子类来帮助大家避免危险。
原子类
AtomicInteger,AtomicLong,AtomicReference等atomic包下的类来提供原子操作。
比如Random中就使用到了原子类来保证线程安全性:1
2
3
4
5
6
7
8
9protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}
也由此可知,Random并不是真正的随机,而是根据初始参数为种子(如果未传以当前的时间为种子)来逐个生成数值:1
2
3
4
5
6
7
8public class RanomTest {
public static void main(String arg[]) {
Random random = new Random(47);
for (int i = 1; i <= 10; i++) {
System.out.println(random.nextInt(100));
}
}
}
无论运行多少次,结果都是一样的。因此,Random在多线程中最好作为static方法来使用,因此它是线程安全的。
同步代码块
synchronized(互斥量){},相比synchronized方法在性能上快很多,但还是没有lock功能强大。
线程本地存储
除了使用同步机制以外,还有第二种方法可以解决共享资源产生冲突的问题,那就是根除对变量的共享。
如何使用线程本地存储?
使用ThreadLocal类
中断
单线程的阻塞可以用多线程来解决,但是多线程的情况下也会出现阻塞,比如等待另一个线程的资源的释放。
线程都有哪些状态?
- 新建:新建的线程所处的短暂的一个状态,等待调度器把状态转变成就绪或者阻塞。
- 就绪:有时间片就可以执行
- 阻塞:某个条件阻止线程执行
- 死亡:线程停止执行,不会被调度器调度。
导致线程进入阻塞状态的原因?
- sleep()
- wait()
- 等待I/O
- 无法获取锁
有时间不希望线程无休止的阻塞下去,怎么中断?
使用interrupt()
应该注意的是,线程是否中断应该由自己来决定,interrupt()只是“给一个建议”,某些情况下,强制中断会带来很严重的后果。
因此并不是所有的阻塞状态都能被中断的,比如IO阻塞(这里指传统的IO)和锁阻塞(Lock类具有可中断的锁ReentrantLock)就属于不可中断的阻塞状态。
线程如何响应中断?
interrupt()是设置线程的一个中断信号,有两种响应的结果。
- 处于可中断的阻塞状态的线程接收到中断信号后会抛出InterruptedException异常,随后重置中断信号。
- 对于没有阻塞状态的线程不会响应这个中断信号,但是可以使用interrupted()来检查中断信号,这个方法会返回当前的中断信号,并且随后重置中断信号,因此也可以通过判断interrupted来。
中断发生在阻塞前会是什么结果?
线程协作
为什么wait()的持有者是Object而不是Thread?
我的理解是:并不是由线程来决定是否能拥有锁,而应该是由锁来决定哪一个线程可以获取到它,因此,是锁决定挂起或者唤醒一个线程,而不是由线程决定自己是否被挂起或者挂起,所以持有wait()和notify()的对象应该是锁,而所有的对象都可以是锁,因而ait()的持有者是Object,而不是Thread。
需要注意的是wait(),notify()等的使用需要在同步代码中,否则会抛出异常。
如何控制wait()发生在notify()之前?
把wait()放在某条件语句(线程A中)中,让wait()发生, 在notify()前后(线程B)中控制wait()方法被唤醒的条件。
为什么wait()要放在while中?
因为被唤醒不等于立刻就获得了锁开始执行,有可能另一被唤醒的线程获得了锁,并且再次改变(达到本线程中)了被挂起的条件,这时候如果本线程还要继续执行,就会出现问题。
Lock的线程协作方式使用
每一个Lock实例都可以使用newCondition()来获取Condition对象,这个Condition对象具有await(),signal()和signalAll方法来控制线程协作。
生产者和消费者
生产者和消费者问题是一个典型的线程协作和案例:
生产者是生产资源的任务拥有者。
消费者是消费资源的任务拥有者。
阻塞队列
当消费和生产的资源上限只有一个的时候,性能上会受到很大的局限。
concurrent包下面的BlockingQuene帮我们创建了一个强大而使用简单的资源队列的使用:
- BlockingQuene中的入队和出对操作都会加锁,所以线程的其它的地方不加锁也不会出现资源共享的问题。
- 当BlockingQuene为空的时候,会挂起消费者。当BlockingQuene满的时候会挂起生产者。
线程间的输入和输出
可以使用管道流实现,一个进程写入,另一个进程读出,没有涉及到资源共享的问题。
实际上,管道流是BlockingQuene出现之前的一种线程间安全交流的代替方法。
死锁
所有的线程都在等待某个条件并且一直等待下去,就是死锁,死锁往往是出乎意料的发生的。
经典的死锁案例——哲学家就餐问题:
基本描述指定了五位哲学家.这些哲学家把他们的一部分时间花在思考上,把一部分时间花在吃饭上。当他们思考的时候,他们不需要任何共享的资源,但是他们使用有限数量的器皿吃饭。在最初的问题描述中,餐具是叉子,需要两把叉子才能从桌子中间的碗中取出意大利面,但说餐具是筷子似乎更有意义。显然,每个哲学家都需要两把筷子才能吃饭。
作为哲学家,他们的钱很少,所以他们只能买得起五只筷子(更普遍地说,筷子的数量与哲学家的数量相同)。它们之间隔着桌子。当一个哲学家想吃东西时,那个哲学家必须拿起左手的筷子和右边的筷子。如果两边的哲学家都在用想要的筷子,我们的哲学家必须等到必要的筷子可用为止。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73public class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException {
while (taken) wait();
taken = true;
}
public synchronized void drop() {
taken = false;
notifyAll();
}
}
public class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random rand = new Random(47);
private void pause() throws InterruptedException {
if (ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}
public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
this.left = left;
this.right = right;
id = ident;
ponderFactor = ponder;
}
public void run() {
try {
while (!Thread.interrupted()) {
print(this + " " + "thinking");
pause(); // Philosopher becomes hungry
print(this + " " + "grabbing right");
right.take();
print(this + " " + "grabbing left");
left.take();
print(this + " " + "eating");
pause();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
print(this + " " + "exiting via interrupt");
}
}
public String toString() {
return "Philosopher " + id;
}
}
public class DeadlockingDiningPhilosophers {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0) ponder = Integer.parseInt(args[0]);
int size = 5;
if (args.length > 1) size = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) sticks[i] = new Chopstick();
for (int i = 0; i < size; i++) exec.execute(new Philosopher(sticks[i], sticks[(i + 1) % size], i, ponder));
if (args.length == 3 && args[2].equals("timeout")) TimeUnit.SECONDS.sleep(5);
else {
System.out.println("Press ‘Enter’ to quit");
System.in.read();
}
exec.shutdownNow();
}
}
死锁的情况是:每一个哲学家都持有一双筷子,等待获取另一双筷子。
死锁发生的条件是什么?
- 有互斥的条件,就是存在只能被一个线程使用的资源。如一个筷子只能被一个哲学家使用。
- 至少有一个任务持有一个资源且等待一个被别的任务持有的资源。如哲学家持有一根筷子,等待另一个哲学家手中的筷子。
- 线程不能抢占资源。
- 必须有循环等待。如哲学家永远都是试图先去获取右手的筷子,之后再获取左手的筷子。
如何防止死锁?
破坏上面的任何一个条件即可。最容易的方法是破坏第四个,比如让其中一个哲学家颠倒一下获取筷子的顺序。
破坏其它条件的解决方案请参考更高级的讨论线程的书籍
concurrent库下的新功能
CountDownLatch
用来同步一个或者多个任务
1 | CountDownLatch(int count) |
CyclicBarrier
让多个线程在同一起跑线等待,然后分回合的并行执行。
下面是一个赛马的比赛,每一回合马前进1到3步。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) {
barrier = b;
}
public synchronized int getStrides() {
return strides;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
// Produces 0, 1 or 2
strides += rand.nextInt(3);;
}
barrier.await();
}
} catch (InterruptedException e) {
// A legitimate way to exit
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("=");
}
// The fence on the racetrack
System.out.println(s);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 10;
new HorseRace(nHorses, pause);
}
}
CyclicBarrier的构造器接受两个变量,第一个指定计数器,第二个指定一个栅栏任务。
当await()的线程到达指定的数量的时候,执行栅栏任务。
DelayQuene
BlockingQueue的一个变种,任务以“延迟”来排序,优先执行即将到期的任务。
队列中的资源对象需要实现Delayed接口,实现getDelay()方法。
Delayed又继承了Complarable接口,因此又需要实现compareTo()方法。
案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) {
return -1;
}
if (trigger > that.trigger) {
return 1;
}
return 0;
}
public void run() {
System.out.print(this + " ");
}
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for (DelayedTask pt : sequence) {
System.out.println(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted()) {
q.take().run();
}
// Run task with the current thread
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
}
class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for (int i = 0; i < 20; i++) {
TimeUnit.SECONDS.sleep(1L);
queue.put(new DelayedTask(rand.nextInt(5000)));
}
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
上面EndSentinel是一个终结任务,作清理工作。
TimeUtil下面的枚举类有一个convert()方法,可以很方便的做时间单位上的转换。
如果队列为空了,那么将会返回null。
PriaorityBlockingQuene
自定义优先级的BlockingQuene,需要队列中的资源实现Comparable接口。
ScheduledExecutor
定时和周期性的执行线程。
1 | // 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。 |
Semaphore
锁在任何时刻只允许一个任务访问一个资源,而信号量允许N个任务同时访问这个资源。比如“线程池”的实例。
使用方法:1
2
3
4
5
6
7
8//指定信号量,默认非公平
public Semaphore(int permits)
//指定信号量和是否公平
public Semaphore(int permits, boolean fair)
//获取信号量,获取不到则阻塞
public void acquire()
//释放信号量
public void release()
Exchanger
交换两个线程的对象数据。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class Car extends Thread {
private Exchanger<String> exchanger;
public Car(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}
public void run() {
try {
System.out.println(1 + ": " + exchanger.exchange("Car"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Bike extends Thread {
private Exchanger<String> exchanger;
public Bike(Exchanger<String> exchanger) {
super();
this.exchanger = exchanger;
}
public void run() {
try {
System.out.println(2 + ": " + exchanger.exchange("Bike"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Car car = new Car(exchanger);
Bike bike = new Bike(exchanger);
car.start();
bike.start();
System.out.println("Main end!");
}
}
仿真
- 类似:生活中适当调整工作人员数量来服务随机到来的人群。
1 | class Customer { |
性能调优
- 性能:原子操作>lock>synchronized
- 风险:原子操作>lock>synchronized
- 可读性:synchronized>lock>原子操作
如何确定同步块包含的范围?
需要包含共享资源被修改的整个过程
容器的性能进化
- java1:类似Vector和Hashtable之类的类,具有许多synchronized的方法,当他们用于非多线程的应用程序中,会导致不可接受的开销。
- java2:新容器都是不同步的,而且Collections中提供了各种同步的内部容器类,这时候的问题是在多线程的情况下,锁实现的开销依然不小。
- java5:添加的专门用于处理同步的免锁容器,采用了更加灵巧的技术实现。
免锁容器采用了什么技术?
只对修改操作加锁操作,为了防止读取操作读到不同的线程的中间状态值,修改是在容器数据结构的某个部分的一个单独的副本上执行的,这个副本在修改的过程中是不可视的,当修改完成的时候,一个原子性的操作将把新的数组换入。
为什么要采用这种处理方法?
- 解决了并发时的安全和性能问题,当修改操作较少的时候很明显。
- 解决了多个迭代器同时修改和遍历的问题
什么是多个迭代器同时修改和遍历的问题?
无论是传统线程安全的Vector,还是新容器Collection,又或者安全的新容器synchronizedCollection都没有解决 迭代器同时修改和遍历 的问题。
这个问题似乎是设计师在设计的时候 就不希望通过原来的容器类中的方法去修改容器的结构,而是通过iterator里面的方法操作。
在代码中的体现就是下面这一段:1
2
3
4final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
modCount表示容器实际被修改(add,remove,set)的次数。
expectedModCount是在迭代器中记录的被修改的次数。
因此,通过容器类中的方法修改后会改变modCount的值,而不会影响expectedModCount的值。
最终导致异常的产生。
如何防止ConcurrentModificationException异常的发生?
单线程下使用iterator里面的方法操作,remove里有以下的一段代码:1
expectedModCount = modCount;
多线程下使用iterator也无效,为什么?
- expectedModCount是成员变量,线程独享的。
- 多线程下非安全的容器会有额外的问题,如下:
1
2if (i >= elementData.length)
throw new ConcurrentModificationExcept();
多线程下如何解决?
- 迭代器加锁进行同步,使得两个迭代器的遍历不会同时发生。
- 回到最初的介绍——使用免锁容器:CopyOnWriteArrayList,CopyOnWriteArraySet,synchronizedHashMap ,ConcurrentHashMap。这就理解了为什么免锁容器要采用这样的技术了。
注意了,问题依然是存在的!我们继续分析:
- 使用迭代器:要注意System.Out.println(list)这个语句也会隐含的调用迭代器。
- 使用免锁容器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
public boolean hasNext() ;
public E next() ;
public int nextIndex();
public int previousIndex();
}
上面是免锁后期的迭代器类,可以看到使用的是snapshot快照,并且没有加锁的操作,因此当真正的elements改变的使用,迭代的依然是数组的快照。
乐观锁
乐观锁并不是加锁。而是使用原子操作compareAndSet(oldValue,newValue)实现的一个很有效的一个技巧。
思想:在使用最初获取数据和最后获取的数据进行对比,一致则操作成功,不一致则操作失败。
ReadWriteLock
1 | Lock rLock = new ReadWriteLock(ture).readLock(); |
可以同时获取多个读锁,但是写锁只有一个,一旦有线程获取到写锁,所有的读锁都会被挂起,直到写锁被释放。
性能上是不可确认的。
此章节只是冰山一角,更多的并发编程参考书籍:《JAVA Concurrency in Practice》《Concurrent Programming in Java》